package com.jhf.youke.saga.domain.service;

import cn.hutool.json.JSONUtil;
import com.jhf.youke.core.ddd.AbstractDomainService;
import com.jhf.youke.core.ddd.DomainMq;
import com.jhf.youke.core.entity.*;
import com.jhf.youke.core.utils.CacheUtils;
import com.jhf.youke.core.utils.Constant;
import com.jhf.youke.core.utils.StringUtils;
import com.jhf.youke.saga.domain.converter.SagaConverter;
import com.jhf.youke.saga.domain.converter.SagaTemplateListConverter;
import com.jhf.youke.saga.domain.gateway.SagaMessageRepository;
import com.jhf.youke.saga.domain.gateway.SagaRepository;
import com.jhf.youke.saga.domain.gateway.SagaTemplateListRepository;
import com.jhf.youke.saga.domain.gateway.SagaUnusualRepository;
import com.jhf.youke.saga.domain.model.Do.SagaDo;
import com.jhf.youke.saga.domain.model.Do.SagaTemplateListDo;
import com.jhf.youke.saga.domain.model.po.SagaMessagePo;
import com.jhf.youke.saga.domain.model.po.SagaPo;
import com.jhf.youke.saga.domain.model.po.SagaTemplateListPo;
import com.jhf.youke.saga.domain.model.po.SagaUnusualPo;
import com.jhf.youke.saga.domain.model.vo.SagaVo;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.*;


/**
 * @author RHJ
 */
@Service
@Log4j2
public class SagaService extends AbstractDomainService<SagaRepository, SagaDo, SagaVo> {


    @Resource
    SagaConverter sagaConverter;

    @Resource
    private SagaUnusualRepository sagaUnusualRepository;

    @Resource
    private SagaTemplateListRepository sagaTemplateListRepository;

    @Resource
    private SagaTemplateListConverter sagaTemplateListConverter;

    @Resource
    private DomainMq domainMq;

    @Resource
    private SagaMessageRepository sagaMessageRepository;

    @Override
    public boolean update(SagaDo entity) {
        SagaPo sagaPo = sagaConverter.do2Po(entity);
        sagaPo.preUpdate();
        return repository.update(sagaPo);
    }

    @Override
    public boolean updateBatch(List<SagaDo> doList) {
        List<SagaPo> poList = sagaConverter.do2PoList(doList);
        return repository.updateBatch(poList);
    }

    @Override
    public boolean delete(SagaDo entity) {
        SagaPo sagaPo = sagaConverter.do2Po(entity);
        return repository.delete(sagaPo);
    }

    @Override
    public boolean deleteBatch(List<Long> idList) {
        return repository.deleteBatch(idList);
    }

    @Override
    public boolean insert(SagaDo entity) {
        SagaPo sagaPo = sagaConverter.do2Po(entity);
        sagaPo.preInsert();
        return repository.insert(sagaPo);
    }

    @Override
    public boolean insertBatch(List<SagaDo> doList) {
        List<SagaPo> poList = sagaConverter.do2PoList(doList);
        poList = SagaPo.getInsertListId(poList);
        return repository.insertBatch(poList);
    }

    @Override
    public Optional<SagaVo> findById(Long id) {
        Optional<SagaPo> sagaPo = repository.findById(id);
        SagaVo sagaVo = sagaConverter.po2Vo(sagaPo.orElse(new SagaPo()));
        return Optional.ofNullable(sagaVo);
    }

    @Override
    public boolean remove(Long id) {
        return repository.remove(id);
    }

    @Override
    public boolean removeBatch(List<Long> idList) {
        return repository.removeBatch(idList);
    }

    @Override
    public List<SagaVo> findAllMatching(SagaDo entity) {
        SagaPo sagaPo = sagaConverter.do2Po(entity);
        List<SagaPo> sagaPoList = repository.findAllMatching(sagaPo);
        return sagaConverter.po2VoList(sagaPoList);
    }


    @Override
    public Pagination<SagaVo> selectPage(SagaDo entity) {
        SagaPo sagaPo = sagaConverter.do2Po(entity);
        PageQuery<SagaPo> pageQuery = new PageQuery<>(sagaPo, entity.getCurrentPage(), entity.getPageSize(), entity.getQuerySort());
        Pagination<SagaPo> pagination = repository.selectPage(pageQuery);
        return new Pagination<>(pagination.getPageNum(), pagination.getPageSize(), pagination.getTotalSize(),
                sagaConverter.po2VoList(pagination.getList()));
    }

    /**
     * 应答
     *
     * @param map 地图
     * @return {@link SagaDo}
     */
    public SagaDo reply(Map<String, Object> map) {
        Integer sort = StringUtils.toInteger(map.get("sort"));
        Integer status = StringUtils.toInteger(map.get("status"));
        log.info("reply status:", status);
        //如果是消费成功，则返回下个节点
        if (SagaDo.CONSUME_OK.equals(status)) {
            List<SagaDo> list = getNextList(map);
            SagaDo next = next(list, sort);
            return next;
        } else {
            //应答失败，记录日志
            SagaDo s = new SagaDo(map);
            SagaUnusualPo sagaUnusualPo = sagaConverter.do2UnusualPo(s);
            sagaUnusualRepository.insert(sagaUnusualPo);
        }
        return null;
    }

    private List<SagaDo> getNextList(Map<String, Object> map) {
        List<SagaPo> sagaPos = repository.getNextList(map);
        List<SagaDo> list = sagaConverter.po2DoList(sagaPos);
        return list;
    }

    public SagaDo next(List<SagaDo> list, Integer sort) {
        boolean next = false;
        for (SagaDo saga : list) {
            log.info("sort : {}", saga.getSort());
            if (next) {
                saga.setIfCurrent(SagaDo.IS_CURRENT);
                SagaPo sagaPo = sagaConverter.do2Po(saga);
                repository.update(sagaPo);
                log.info("next saga update");
                return saga;
            }

            // 如果是当前节点，更新当前节点
            if (saga.getSort().equals(sort)) {
                next = true;
                saga.setStatus(SagaDo.REPLY_OK);
                if (!SagaDo.IS_END.equals(saga.getIfEnd())) {
                    saga.setIfCurrent(SagaDo.IS_NOT_CURRENT);
                }
                log.info("current saga sort: {}, status: {}, biz_id :{}", saga.getSort(), saga.getStatus(), saga.getBizId());
                SagaPo sagaPo = sagaConverter.do2Po(saga);
                repository.update(sagaPo);
                log.info("current saga update");
            }

        }
        return null;
    }


    public SagaDo create(SagaMsg msg, List<SagaTemplateListDo> sagaTemplateLists) {

        List<SagaDo> sagaList = new ArrayList<>();
        for (SagaTemplateListDo template : sagaTemplateLists) {
            SagaDo s = new SagaDo(msg, template);
            sagaList.add(s);
        }
        insertBatch(sagaList);
        return sagaList.get(0);
    }


    public List<SagaTemplateListDo> getTemplateList(SagaMsg msg) {
        List<SagaTemplateListPo> sagaTemplateListPos = sagaTemplateListRepository.getTemplateList(msg.getCode());
        List<SagaTemplateListDo> list = sagaTemplateListConverter.po2DoList(sagaTemplateListPos);
        return list;
    }

    public List<SagaDo> getListByBiz(String code, Long bizId, Integer objectId) {
        List<SagaPo> sagaPos = repository.getListByBiz(code, bizId, objectId);
        List<SagaDo> list = sagaConverter.po2DoList(sagaPos);
        return list;
    }

    public String sendMq(SagaDo saga, Map<String, Object> msg) {
        Message message = new Message();
        message.setTopic(saga.getTopic());
        message.setTag("all");
        message.setGroupName("sagaGroup");
        Map<String, Object> map = new HashMap<>(16);
        map.put("sort", saga.getSort());
        map.put("code", saga.getCode());
        map.put("topic", saga.getTopic());
        map.put("bizId", saga.getBizId());
        map.put("objectId", saga.getObjectId());
        map.put("status", saga.getStatus());
        map.put("ifCurrent", saga.getIfCurrent());
        map.put("ifEnd", saga.getIfEnd());
        map.put("messageId", saga.getMessageId());
        map.put("message", msg);
        message.setMessages(map);
        return JSONUtil.toJsonStr(message);
    }


    public RedisHashItemDto setHashItem(SagaDo saga) {
        RedisHashItemDto dto = new RedisHashItemDto();
        dto.setKey("saga_abnormal");
        dto.setItem(saga.getId().toString());
        dto.setValue(JSONUtil.toJsonStr(saga));
        dto.setTime(20 * 60 * 60);
        return dto;
    }


    public void abnormalHandling(Long id) {
        Optional<SagaPo> sagaPo = repository.findById(id);
        assert !sagaPo.isEmpty() : "id is not find saga entity";
        SagaPo saga = sagaPo.get();
        SagaMessagePo msg = sagaMessageRepository.getMessage(saga.getCode(), saga.getBizId(), saga.getObjectId());

        if (SagaDo.SAGA_CREATE.equals(saga.getStatus()) || SagaDo.SEND_FAIL.equals(saga.getStatus())) {

            log.info("reply send mq message {}", JSONUtil.toJsonStr(msg));
            SagaDo sagaDo = sagaConverter.po2Do(saga);
            SendResult result = domainMq.send(sendMq(sagaDo, getMsg(sagaDo)));
            // 发送成功，再更新saga状态
            if (Constant.RESPONSE_OK.equals(result.getCode())) {
                log.info("mq send ok, messageId {}", result.getData().getMsgId());
                sagaDo.setMessageId(result.getData().getMsgId());
                sagaDo.setStatus(SagaDo.SEND_OK);
            } else {
                sagaDo.setStatus(SagaDo.SEND_FAIL);
                log.info("mq send fail");
            }
            update(sagaDo);
        }
    }


    public Boolean updateByBiz(SagaDo saga) {
        SagaPo sagaPo = sagaConverter.do2Po(saga);
        return repository.updateByBiz(sagaPo);
    }


    public Map<String, Object> getMsg(SagaDo saga) {
        String key = "saga_msg_" + saga.getCode() + "_" + saga.getBizId();
        Map<String, Object> map = null;
        try {
            map = CacheUtils.getObject(key, Map.class);
        } catch (Exception e) {
            log.info(e.getMessage());
        }

        if (map == null || map.isEmpty()) {
            SagaMessagePo message = sagaMessageRepository.getMessage(saga.getCode(), saga.getBizId(), saga.getObjectId());
            if (message != null && message.getMessage() != null) {
                CacheUtils.set(key, message.getMessage(), 20 * 60 * 60);
                String m = message.getMessage();
                map = JSONUtil.toBean(m, Map.class);
                return map;
            } else {
                return new HashMap<>(8);
            }
        }
        return map;
    }

}

